package com.simafei.flow.web.service.impl;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.type.TypeReference;
import com.simafei.flow.core.Edge;
import com.simafei.flow.core.EdgeResult;
import com.simafei.flow.core.Flow;
import com.simafei.flow.core.FlowExecution;
import com.simafei.flow.core.FlowFactory;
import com.simafei.flow.core.FlowResult;
import com.simafei.flow.core.FlowResumption;
import com.simafei.flow.core.Node;
import com.simafei.flow.core.NodeResult;
import com.simafei.flow.core.api.Api;
import com.simafei.flow.core.api.SerializeType;
import com.simafei.flow.core.common.Criteria;
import com.simafei.flow.core.common.NodeType;
import com.simafei.flow.core.common.Variable;
import com.simafei.flow.core.data.AggSpec;
import com.simafei.flow.core.data.LoadSpec;
import com.simafei.flow.core.data.StoreSpec;
import com.simafei.flow.core.json.JsonUtils;
import com.simafei.flow.core.rule.FlowRules;
import com.simafei.flow.web.common.PageUtils;
import com.simafei.flow.web.common.error.BizErrorCode;
import com.simafei.flow.web.common.error.BizException;
import com.simafei.flow.web.dao.FlowMapper;
import com.simafei.flow.web.dao.GroupMapper;
import com.simafei.flow.web.domain.graph.FlowGraph;
import com.simafei.flow.web.domain.req.FlowExecReq;
import com.simafei.flow.web.domain.req.FlowReq;
import com.simafei.flow.web.domain.req.PageParam;
import com.simafei.flow.web.domain.req.ResumeReq;
import com.simafei.flow.web.domain.resp.FlowResp;
import com.simafei.flow.web.domain.resp.FlowResultResp;
import com.simafei.flow.web.domain.resp.PageResult;
import com.simafei.flow.web.domain.resp.VariableResp;
import com.simafei.flow.web.entity.ApiPO;
import com.simafei.flow.web.entity.EdgePO;
import com.simafei.flow.web.entity.EdgeResultPO;
import com.simafei.flow.web.entity.FlowPO;
import com.simafei.flow.web.entity.GroupPO;
import com.simafei.flow.web.entity.NodePO;
import com.simafei.flow.web.entity.NodeResultPO;
import com.simafei.flow.web.entity.VariablePO;
import com.simafei.flow.web.service.IApiService;
import com.simafei.flow.web.service.IEdgeResultService;
import com.simafei.flow.web.service.IEdgeService;
import com.simafei.flow.web.service.IFlowResultService;
import com.simafei.flow.web.service.IFlowService;
import com.simafei.flow.web.service.INodeResultService;
import com.simafei.flow.web.service.INodeService;
import com.simafei.flow.web.service.IVariableService;
import com.simafei.flow.web.transfer.EdgeResultTransfer;
import com.simafei.flow.web.transfer.FlowTransfer;
import com.simafei.flow.web.transfer.NodeResultTransfer;
import com.simafei.flow.web.transfer.VariableTransfer;
import com.simafei.flow.web.util.SseHelper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * <p>
 * 决策流 服务实现类
 * </p>
 *
 * @author fengpengju
 * @since 2024-06-18
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class FlowServiceImpl extends ServiceImpl<FlowMapper, FlowPO> implements IFlowService {

    private final INodeService nodeService;

    private final IEdgeService edgeService;

    private final IVariableService variableService;

    private final FlowFactory flowFactory;

    private final FlowExecutionListener executionListener;

    private final IApiService apiService;

    private final IFlowResultService flowResultService;

    private final INodeResultService nodeResultService;

    private final IEdgeResultService edgeResultService;

    private final SseHelper sseHelper;

    // 避免循环依赖
    private final GroupMapper groupMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public FlowResp add(FlowReq req) {
        GroupPO group = new GroupPO();
        group.setGroupName(req.getFlowName());
        group.setGroupDesc(req.getFlowDesc());
        group.setCategoryId(req.getCategoryId());
        groupMapper.insert(group);

        FlowPO po = FlowTransfer.INSTANCE.toPo(req);
        po.setGroupId(group.getId());
        po.setWeight(100);
        po.setVersionNo("V0");
        save(po);

        if (CollectionUtil.isNotEmpty(req.getVariables())) {
            List<VariablePO> vars = VariableTransfer.INSTANCE.toPo(req.getVariables());
            vars.forEach(var -> var.setGroupId(po.getId()));
            variableService.saveBatch(vars);
        }
        return FlowTransfer.INSTANCE.toResp(po);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public FlowResp addGraph(FlowGraph graph) {
        update(graph.getFlowId(), graph.getGraph());

        List<NodePO> nodes = graph.getNodes().stream().map(node -> {
            NodePO po = new NodePO();
            po.setFlowId(graph.getFlowId());
            po.setNodeId(node.getId());
            po.setNodeName(node.getName());
            po.setNodeType(node.getNodeType().name());
            po.setBlocked(node.isBlocked());
            po.setVariables(JsonUtils.toJsonString(node.getVariables()));
            po.setPayload(JsonUtils.toJsonString(node.getPayload()));
            po.setExtend(JsonUtils.toJsonString(node.extend()));
            return po;
        }).toList();
        nodeService.remove(new LambdaUpdateWrapper<NodePO>().eq(NodePO::getFlowId, graph.getFlowId()));
        nodeService.saveBatch(nodes);

        List<EdgePO> edges = graph.getEdges().stream().map(edge -> {
            EdgePO po = new EdgePO();
            po.setFlowId(graph.getFlowId());
            po.setFromNodeId(edge.getFromId());
            po.setToNodeId(edge.getToId());
            po.setEdgeId(edge.getId());
            po.setCriteria(JsonUtils.toJsonString(edge.getCriteria()));
            return po;
        }).toList();
        edgeService.remove(new LambdaUpdateWrapper<EdgePO>().eq(EdgePO::getFlowId, graph.getFlowId()));
        edgeService.saveBatch(edges);
        return getSimple(graph.getFlowId());
    }

    @Override
    public FlowResp get(Long flowId) {
        FlowPO flow = getById(flowId);
        return FlowTransfer.INSTANCE.toResp(flow);
    }

    @Override
    public FlowResp getSimple(Long flowId) {
        FlowPO flow = getOne(buildWrapper().eq(FlowPO::getId, flowId));
        return FlowTransfer.INSTANCE.toResp(flow);
    }

    @Override
    public void update(Long flowId, String graph) {
        update(new LambdaUpdateWrapper<>(FlowPO.class).eq(FlowPO::getId, flowId).set(FlowPO::getGraph, graph));
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean delete(Long flowId) {
        FlowResp flow = getSimple(flowId);
        removeById(flowId);
        nodeService.remove(new LambdaQueryWrapper<NodePO>().eq(NodePO::getFlowId, flowId));
        edgeService.remove(new LambdaQueryWrapper<EdgePO>().eq(EdgePO::getFlowId, flowId));
        if (Objects.nonNull(flow)) {
            variableService.remove(new LambdaQueryWrapper<VariablePO>().eq(VariablePO::getGroupId, flow.getGroupId()));
        }
        return true;
    }

    @Override
    public FlowResp newVersion(Long flowId) {
        FlowPO flow = getById(flowId);
        if (Objects.isNull(flow)) {
            throw new BizException(BizErrorCode.FLOW_NOT_EXIST, flowId);
        }
        Map<String, Object> map = getMap(new QueryWrapper<FlowPO>()
                .select("max(CAST(SUBSTRING(version_no, 2) as UNSIGNED)) as versionNo")
                .eq("group_id", flow.getGroupId()));

        FlowPO newFlow = new FlowPO();
        newFlow.setFlowName(flow.getFlowName());
        newFlow.setCategoryId(flow.getCategoryId());
        newFlow.setGraph(flow.getGraph());
        newFlow.setWeight(0);
        newFlow.setVersionNo(getNextVersion(map.getOrDefault("versionNo", "0").toString()));
        newFlow.setGroupId(flow.getGroupId());
        save(newFlow);
        return FlowTransfer.INSTANCE.toResp(newFlow);
    }

    private String getNextVersion(String currentVersion) {
        int version = Integer.parseInt(currentVersion);
        return "V" + (version + 1);
    }

    @Override
    public PageResult<FlowResp> queryPage(PageParam param) {
        Page<FlowPO> page = page(new Page<>(param.getPageNo(), param.getPageSize()),
                new LambdaQueryWrapper<FlowPO>().orderByDesc(FlowPO::getCreateTime));
        return PageUtils.buildPageResult(page, FlowTransfer.INSTANCE::toResp);
    }

    @Override
    public List<FlowResp> listByCategory(Long categoryId) {
        List<FlowPO> list = list(buildWrapper().eq(FlowPO::getCategoryId, categoryId));
        return FlowTransfer.INSTANCE.toResp(list);
    }

    @Override
    public List<FlowResp> listByGroup(Long groupId) {
        List<FlowPO> list = list(buildWrapper().eq(FlowPO::getGroupId, groupId));
        return FlowTransfer.INSTANCE.toResp(list);
    }

    @Override
    public Flow buildFlow(Long flowId) {
        FlowResp simple = getSimple(flowId);
        if (Objects.isNull(simple)) {
            throw new BizException(BizErrorCode.FLOW_NOT_EXIST, flowId);
        }

        Flow flow = new Flow();
        flow.setId(String.valueOf(flowId));
        List<NodePO> list = nodeService.list(new LambdaQueryWrapper<NodePO>().eq(NodePO::getFlowId, flowId));
        Map<String, Node> nodeMap = MapUtil.newHashMap();
        list.forEach(node -> {
            Node flowNode = buildNode(node);
            nodeMap.put(node.getNodeId(), flowNode);
            flow.addNode(flowNode);
        });

        List<EdgePO> edges = edgeService.list(new LambdaQueryWrapper<EdgePO>().eq(EdgePO::getFlowId, flowId));
        edges.forEach(edge -> {
            Node from = nodeMap.get(edge.getFromNodeId());
            Node to = nodeMap.get(edge.getToNodeId());
            Edge flowEdge = new Edge();
            flowEdge.setId(edge.getEdgeId());
            flowEdge.setFrom(from);
            flowEdge.setTo(to);
            flowEdge.setCriteria(JsonUtils.parseObject(edge.getCriteria(), Criteria.class));
            flow.addEdge(flowEdge);
        });
        return flow;
    }

    @Override
    public FlowResultResp execute(Long flowId, Map<String, Object> param) {
        Flow flow = buildFlow(flowId);
        FlowExecution execution = buildFlowExecution(false);

        FlowResult flowResult = flow.execute(execution, param);

        // 是否改完从IFlowResultService里getByExecId()方法查询
        FlowResultResp resp = new FlowResultResp();
        resp.setExecId(execution.getExecId());
        resp.setFlowId(String.valueOf(flowId));
        resp.setExecParam(JsonUtils.toJsonString(param));
        resp.setExecResult(JsonUtils.toJsonString(flowResult.getExecResults()));
        resp.setExecStatus(flowResult.getExecStatus());
        resp.setFlowStatus(flowResult.getFlowStatus());
        resp.setTestMode(execution.isTest());
        resp.setExecMode(flowResult.getExecMode());
        resp.setSpendTime(flowResult.getSpendTime());
        if (CollectionUtil.isNotEmpty(flowResult.getCauses())) {
            resp.setErrorMsg(flowResult.getCauses().stream().map(Throwable::getMessage).collect(Collectors.joining("\n")));
        }
        return resp;
    }

    @Override
    public String executeAsync(Long flowId, Map<String, Object> param) {
        Flow flow = buildFlow(flowId);
        FlowExecution execution = buildFlowExecution(false);
        flow.executeAsync(execution, param);
        return execution.getExecId();
    }

    @Override
    public SseEmitter test(FlowExecReq req) {
        Flow flow = buildFlow(req.getFlowId());
        FlowExecution execution = buildFlowExecution(true);
        SseEmitter emitter = new SseEmitter(60_000L);
        sseHelper.addEmitter(execution.getExecId(), emitter);
        flow.executeAsync(execution, req.getParam());
        return emitter;
    }

    @Override
    public SseEmitter resume(ResumeReq req) {
        SseEmitter emitter = new SseEmitter(60_000L);
        sseHelper.addEmitter(req.getExecId(), emitter);

        FlowResultResp resultResp = flowResultService.getByExecId(req.getExecId());
        if (Objects.isNull(resultResp)) {
            throw new BizException(BizErrorCode.FLOW_RESUME_ERROR);
        }

        Flow flow = buildFlow(req.getFlowId());
        Optional<Node> node = flow.getNodes().stream().filter(n -> n.getId().equals(req.getNodeId())).findFirst();
        if (node.isEmpty()) {
            throw new BizException(BizErrorCode.FLOW_RESUME_ERROR);
        }
        // 已经执行的节点结果
        List<NodeResultPO> nodeResults = nodeResultService.list(new LambdaQueryWrapper<NodeResultPO>()
                .eq(NodeResultPO::getExecId, req.getExecId())
                .eq(NodeResultPO::getFlowId, req.getFlowId()));
        Set<NodeResult> nodeResultsSet = new HashSet<>(NodeResultTransfer.INSTANCE.toBo(nodeResults));

        // 已经执行的边结果
        List<EdgeResultPO> edgeResults = edgeResultService.list(new LambdaQueryWrapper<EdgeResultPO>()
                .eq(EdgeResultPO::getExecId, req.getExecId())
                .eq(EdgeResultPO::getFlowId, req.getFlowId()));
        Set<EdgeResult> edgeResultsSet = new HashSet<>(EdgeResultTransfer.INSTANCE.toBo(edgeResults));

        FlowResumption resumeContext = FlowResumption.builder()
                .execId(req.getExecId())
                .flowId(String.valueOf(req.getFlowId()))
                .flowStatus(req.getFlowStatus())
                .test(resultResp.getTestMode())
                .node(node.get())
                .parentId(req.getParentId())
                .executedNodeResults(nodeResultsSet)
                .executedEdgeResults(edgeResultsSet)
                .flowInputParams(transferParams(resultResp.getExecParam()))
                .resumeParams(req.getExecParams())
                .executionListener(this.executionListener)
                .build();
        flow.resumeAsync(resumeContext);
        return emitter;
    }

    @Override
    public List<VariableResp> getFlowVars(Long flowId) {
        FlowResp flow = getSimple(flowId);
        if (Objects.nonNull(flow)) {
            List<VariablePO> list = variableService.list(new LambdaQueryWrapper<VariablePO>().eq(VariablePO::getGroupId, flow.getGroupId()));
            return VariableTransfer.INSTANCE.toResp(list);
        }
        return new ArrayList<>();
    }

    private Node buildNode(NodePO node) {
        NodeType nodeType = NodeType.valueOf(node.getNodeType());
        return switch (nodeType) {
            case START -> flowFactory.createStartNode(node.getNodeId(), node.getNodeName());
            case END -> {
                List<Variable> outVars = JsonUtils.parseArray(node.getExtend(), Variable.class);
                yield flowFactory.createEndNode(node.getNodeId(), node.getNodeName(), outVars);
            }
            case DATA_AGG -> {
                AggSpec aggSpec = JsonUtils.parseObject(node.getExtend(), AggSpec.class);
                yield flowFactory.createAggNode(node.getNodeId(), node.getNodeName(), aggSpec);
            }
            case DATA_LOAD -> {
                LoadSpec loadSpec = JsonUtils.parseObject(node.getExtend(), LoadSpec.class);
                yield flowFactory.createDataNode(node.getNodeId(), node.getNodeName(), loadSpec);
            }
            case DATA_MODIFY -> {
                StoreSpec storeSpec = JsonUtils.parseObject(node.getExtend(), StoreSpec.class);
                yield flowFactory.createStoreNode(node.getNodeId(), node.getNodeName(), storeSpec);
            }
            case RULE -> {
                FlowRules flowRules = JsonUtils.parseObject(node.getExtend(), FlowRules.class);
                yield flowFactory.createRuleNode(node.getNodeId(), node.getNodeName(), flowRules);
            }
            case API -> {
                String apiId = node.getExtend();
                ApiPO api = apiService.getById(apiId);
                Api build = Api.builder().httpUrl(api.getHttpUrl())
                        .httpMethod(api.getHttpMethod())
                        .template(api.getTemplate())
                        .serializeType(SerializeType.of(api.getSerializeType()))
                        .serializeScript(api.getSerializeScript()).build();
                yield flowFactory.createApiNode(node.getNodeId(), node.getNodeName(), build);
            }
            case MANUAL -> flowFactory.createManualNode(node.getNodeId(), node.getNodeName());
            default -> flowFactory.createNode(node.getNodeId(), node.getNodeName(), node.getBlocked(), nodeType);
        };
    }

    private FlowExecution buildFlowExecution(boolean isTest) {
        String execId = IdUtil.fastSimpleUUID();
        return FlowExecution.builder()
                .execId(execId)
                .test(isTest)
                .executionListener(executionListener)
                .build();
    }

    private LambdaQueryWrapper<FlowPO> buildWrapper() {
        return new LambdaQueryWrapper<FlowPO>()
                .select(FlowPO::getId, FlowPO::getFlowName,
                        FlowPO::getCategoryId,
                        FlowPO::getGroupId,
                        FlowPO::getVersionNo,
                        FlowPO::getWeight,
                        FlowPO::getStatus, FlowPO::getValid,
                        FlowPO::getCreateTime, FlowPO::getUpdateTime);
    }

    private Map<String, Object> transferParams(String execParam) {
        return JsonUtils.parseObject(execParam, new TypeReference<>() {
        });
    }
}
